// Copyright (c) 2020-present, INSPUR Co, Ltd. All rights reserved.
// This source code is licensed under Apache 2.0 License.

#include <ctime>
#include <memory>
#include <string>
#include <thread>

#include "db/db_test_util.h"
#include "db/memtable.h"
#include "db/range_del_aggregator.h"
#include "port/stack_trace.h"
#include "pure_mem/std_out_logger.h"
#include "rangearena/range_arena.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/slice_transform.h"

namespace rocksdb {

    class DBMemTableTest : public DBTestBase {
    public:
        DBMemTableTest() : DBTestBase("/db_memtable_test") {}

        std::string Get2(const Slice& k) {
            ReadOptions options;
            options.verify_checksums = true;
            std::string result;
            Status s = db_->Get(options, k, &result);
            if (s.IsNotFound()) {
                result = "NOT_FOUND";
            } else if (!s.ok()) {
                result = s.ToString();
            }
            return result;
        }
    };

    class MockMemTableRep : public MemTableRep {
    public:
        explicit MockMemTableRep(Allocator* allocator, MemTableRep* rep)
                : MemTableRep(allocator), rep_(rep), num_insert_with_hint_(0) {}

        KeyHandle AllocatePure(const size_t len, char** buf, const Slice& key,
                               void** rangearena) override {
            return rep_->AllocatePure(len, buf, key, rangearena);
        }

        void Insert(KeyHandle handle) override { rep_->Insert(handle); }

        void InsertWithHint(KeyHandle handle, void** hint) override {
            num_insert_with_hint_++;
            EXPECT_NE(nullptr, hint);
            last_hint_in_ = *hint;
            rep_->InsertWithHint(handle, hint);
            last_hint_out_ = *hint;
        }

        bool Contains(const char* key) const override { return rep_->Contains(key); }

        void Get(const LookupKey& k, void* callback_args,
                 bool (*callback_func)(void* arg, const char* entry)) override {
            rep_->Get(k, callback_args, callback_func);
        }

        size_t ApproximateMemoryUsage() override {
            return rep_->ApproximateMemoryUsage();
        }

        Iterator* GetIterator(Arena* arena) override {
            return rep_->GetIterator(arena);
        }

        void* last_hint_in() { return last_hint_in_; }
        void* last_hint_out() { return last_hint_out_; }
        int num_insert_with_hint() { return num_insert_with_hint_; }

    private:
        std::unique_ptr<MemTableRep> rep_;
        void* last_hint_in_;
        void* last_hint_out_;
        int num_insert_with_hint_;
    };

    class MockMemTableRepFactory : public MemTableRepFactory {
    public:
        MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& cmp,
                                       Allocator* allocator,
                                       const SliceTransform* transform,
                                       Logger* logger) override {
            PureMemFactory factory;
            MemTableRep* skiplist_rep =
                    factory.CreateMemTableRep(cmp, allocator, transform, logger);
            mock_rep_ = new MockMemTableRep(allocator, skiplist_rep);
            return mock_rep_;
        }

        MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& cmp,
                                       Allocator* allocator,
                                       const SliceTransform* transform,
                                       Logger* logger,
                                       uint32_t column_family_id) override {
            last_column_family_id_ = column_family_id;
            return CreateMemTableRep(cmp, allocator, transform, logger);
        }

        const char* Name() const override { return "MockMemTableRepFactory"; }

        MockMemTableRep* rep() { return mock_rep_; }

        bool IsInsertConcurrentlySupported() const override { return false; }

        uint32_t GetLastColumnFamilyId() { return last_column_family_id_; }

    private:
        MockMemTableRep* mock_rep_;
        // workaround since there's no port::kMaxUint32 yet.
        uint32_t last_column_family_id_ = static_cast<uint32_t>(-1);
    };

    class TestPrefixExtractor : public SliceTransform {
    public:
        const char* Name() const override { return "TestPrefixExtractor"; }

        Slice Transform(const Slice& key) const override {
            const char* p = separator(key);
            if (p == nullptr) {
                return Slice();
            }
            return Slice(key.data(), p - key.data() + 1);
        }

        bool InDomain(const Slice& key) const override {
            return separator(key) != nullptr;
        }

        bool InRange(const Slice& /*key*/) const override { return false; }

    private:
        const char* separator(const Slice& key) const {
            return reinterpret_cast<const char*>(memchr(key.data(), '_', key.size()));
        }
    };

    Slice genMVCCKey1(const char* key) {
        uint32_t keySize = strlen(key) + 1;
        char* ret = new char[keySize];
        memset(ret, '\0', keySize);
        memcpy(ret, key, strlen(key));
        return Slice(ret, keySize);
    }

    void EncodeUint32(std::string* buf, uint32_t v) {
        const uint8_t tmp[sizeof(v)] = {
                uint8_t(v >> 24),
                uint8_t(v >> 16),
                uint8_t(v >> 8),
                uint8_t(v),
        };
        buf->append(reinterpret_cast<const char*>(tmp), sizeof(tmp));
    }


    void EncodeUint64(std::string* buf, uint64_t v) {
        const uint8_t tmp[sizeof(v)] = {
                uint8_t(v >> 56), uint8_t(v >> 48), uint8_t(v >> 40), uint8_t(v >> 32),
                uint8_t(v >> 24), uint8_t(v >> 16), uint8_t(v >> 8),  uint8_t(v),
        };
        buf->append(reinterpret_cast<const char*>(tmp), sizeof(tmp));
    }

    void EncodeTimestamp(std::string& s, int64_t wall_time, int32_t logical) {
        EncodeUint64(&s, uint64_t(wall_time));
        if (logical != 0) {
            EncodeUint32(&s, uint32_t(logical));
        }
    }
    const int kMVCCVersionTimestampSize = 12;
    std::string EncodeKey(const rocksdb::Slice& key, int64_t wall_time, int32_t logical) {
        std::string s;
        const bool ts = wall_time != 0 || logical != 0;
        s.reserve(key.size() + 1 + (ts ? 1 + kMVCCVersionTimestampSize : 0));
        s.append(key.data(), key.size());
        if (ts) {
            // Add a NUL prefix to the timestamp data. See DBPrefixExtractor.Transform
            // for more details.
            s.push_back(0);
            EncodeTimestamp(s, wall_time, logical);
        }
        s.push_back(char(s.size() - key.size()));
        return s;
    }


    class DBComparator : public rocksdb::Comparator {
    public:
        DBComparator() = default;

        virtual const char* Name() const override { return "bini_comparator"; }

        virtual int Compare(const rocksdb::Slice& a, const rocksdb::Slice& b) const override;
        virtual bool Equal(const rocksdb::Slice& a, const rocksdb::Slice& b) const override;
        virtual void FindShortestSeparator(std::string* start,
                                           const rocksdb::Slice& limit) const override;
        virtual void FindShortSuccessor(std::string* key) const override;
    };

    const DBComparator kComparator;

    inline bool SplitKey(rocksdb::Slice buf, rocksdb::Slice* key,
                         rocksdb::Slice* timestamp) {
        if (buf.empty()) {
            return false;
        }
        const char ts_size = buf[buf.size() - 1];
        if ((uint8_t)ts_size >= buf.size()) {
            return false;
        }
        *key = rocksdb::Slice(buf.data(), buf.size() - ts_size - 1);
        *timestamp = rocksdb::Slice(key->data() + key->size(), ts_size);
        return true;
    }

    int DBComparator::Compare(const rocksdb::Slice& a, const rocksdb::Slice& b) const {
        rocksdb::Slice key_a, key_b;
        rocksdb::Slice ts_a, ts_b;
        if (!SplitKey(a, &key_a, &ts_a) || !SplitKey(b, &key_b, &ts_b)) {
            // This should never happen unless there is some sort of corruption of
            // the keys.
            return a.compare(b);
        }

        const int c = key_a.compare(key_b);
        if (c != 0) {
            return c;
        }
        if (ts_a.empty()) {
            if (ts_b.empty()) {
                return 0;
            }
            return -1;
        } else if (ts_b.empty()) {
            return +1;
        }
        return ts_b.compare(ts_a);
    }

    bool DBComparator::Equal(const rocksdb::Slice& a, const rocksdb::Slice& b) const { return a == b; }

    namespace {

        void ShrinkSlice(rocksdb::Slice* a, size_t size) { a->remove_suffix(a->size() - size); }

        size_t SharedPrefixLen(const rocksdb::Slice& a, const rocksdb::Slice& b) {
            auto n = std::min(a.size(), b.size());
            size_t i = 0;
            for (i = 0; i < n && a[i] == b[i]; ++i) {
            }
            return i;
        }

        bool FindSeparator(rocksdb::Slice* a, std::string* a_backing, const rocksdb::Slice& b) {
            auto prefix = SharedPrefixLen(*a, b);
            auto n = std::min(a->size(), b.size());
            if (prefix >= n) {
                // The > case is not actually possible.
                assert(prefix == n);
                // One slice is a prefix of another.
                return false;
            }
            // prefix < n. So can look at the characters at prefix, where they differed.
            if (static_cast<unsigned char>((*a)[prefix]) >= static_cast<unsigned char>(b[prefix])) {
                // == is not possible since they differed.
                assert((*a)[prefix] != b[prefix]);
                // So b is smaller than a.
                return false;
            }
            if ((prefix < b.size() - 1) ||
                static_cast<unsigned char>((*a)[prefix]) + 1 < static_cast<unsigned char>(b[prefix])) {
                // a and b do not have consecutive characters at prefix.
                (*a_backing)[prefix]++;
                ShrinkSlice(a, prefix + 1);
                return true;
            }
            // They two slices have consecutive characters at prefix, so we leave the
            // character at prefix unchanged for a. Now we are free to increment any
            // subsequent character in a, to make the new a bigger than the old a.
            ++prefix;
            for (size_t i = prefix; i < a->size() - 1; ++i) {
                if (static_cast<unsigned char>((*a)[i]) != 0xff) {
                    (*a_backing)[i]++;
                    ShrinkSlice(a, i + 1);
                    return true;
                }
            }
            return false;
        }

    }  // namespace

    void DBComparator::FindShortestSeparator(std::string* start, const rocksdb::Slice& limit) const {
        rocksdb::Slice key_s, key_l;
        rocksdb::Slice ts_s, ts_l;
        if (!SplitKey(*start, &key_s, &ts_s) || !SplitKey(limit, &key_l, &ts_l)) {
            return;
        }
        auto found = FindSeparator(&key_s, start, key_l);
        if (!found)
            return;
        start->resize(key_s.size() + 1);
        (*start)[key_s.size()] = 0x00;
    }

    void DBComparator::FindShortSuccessor(std::string* key) const {
        rocksdb::Slice k, ts;
        if (!SplitKey(*key, &k, &ts)) {
            return;
        }
        for (size_t i = 0; i < k.size(); ++i) {
            if (static_cast<unsigned char>(k[i]) != 0xff) {
                (*key)[i]++;
                key->resize(i + 2);
                (*key)[i + 1] = 0;
                return;
            }
        }
    }



    TEST_F(DBMemTableTest, flushManager) {
        Options options;
        // Create a MemTable
        InternalKeyComparator cmp(BytewiseComparator());
        options.memtable_factory = std::make_shared<PureMemFactory>();
        options.pureMemTable = true;
//        options.encode_version_node = false;
        options.cf_paths.emplace_back(dbname_, 0);
        options.cf_paths.emplace_back(dbname_, 1);
        ImmutableCFOptions ioptions(options);
        ioptions.info_log = new StdOutLogger(InfoLogLevel::DEBUG_LEVEL);
        WriteBufferManager wb(options.db_write_buffer_size);
        auto mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
                                kMaxSequenceNumber, 0 /* column_family_id */);

        // Write some keys and make sure it returns false on duplicates
        bool res;
        // Test the duplicate keys under stress
        char tmp[1000];
        memset(tmp, 70, 1000);
        SequenceNumber seq = 1000000000;
        int record_sum = 200000;
        for (int i = 0; i < record_sum; i++) {
            seq++;
            std::string a = std::to_string(seq) + "_test";
            res = mem->Add(seq, kTypeValue, genMVCCKey1(a.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
        }

//        std::this_thread::sleep_for(std::chrono::seconds(5));
        delete mem;
    }

    TEST_F(DBMemTableTest, Expand) {
        Options options = CurrentOptions();
        // Create a MemTable
        InternalKeyComparator cmp(BytewiseComparator());
        options.memtable_factory = std::make_shared<PureMemFactory>();
        options.pureMemTable = true;
//        options.encode_version_node = false;
        options.cf_paths.emplace_back(dbname_, 0);
        ImmutableCFOptions ioptions(options);
        WriteBufferManager wb(options.db_write_buffer_size);
        MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
                                     kMaxSequenceNumber, 0 /* column_family_id */);
        std::cout << "memtable create ok." << std::endl;
        // Write some keys and make sure it returns false on duplicates
        bool res;
        std::cout << "Enter the loop" << std::endl;
        // Test the duplicate keys under stress
        char tmp[1000] = {'0'};
        SequenceNumber seq = 900000;
        int record_sum = 64000;
        for (int i = 0; i < record_sum; i++) {
            seq++;
            std::string a = std::to_string(seq) + "_test";
            // std::cout << "a : " << a << ", a.data()" << a.data() << ", a.length:" <<
            // a.length()<< std::endl;
            res = mem->Add(seq, kTypeValue, genMVCCKey1(a.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            std::string value;
            Status s;
            ReadOptions read_opts;
            SequenceNumber sq = 0;
            MergeContext merge_context;
            res = mem->Get(LookupKey(genMVCCKey1(a.data()), seq), &value, &s,
                           &merge_context, &sq, &sq, read_opts);
            ASSERT_TRUE(res);
            ASSERT_EQ(value.length(), 1000);
        }
        std::cout << "-----------insert all records ok." << std::endl;
        // std::this_thread::sleep_for(std::chrono::milliseconds(1000));

        seq = 900000;
        for (int i = 0; i < record_sum; i++) {
            seq++;
            std::string a = std::to_string(seq) + "_test";
            std::string value;
            Status s;
            ReadOptions read_opts;
            SequenceNumber sq = 0;
            MergeContext merge_context;
            res = mem->Get(LookupKey(genMVCCKey1(a.data()), seq), &value, &s,
                           &merge_context, &sq, &sq, read_opts);
            ASSERT_TRUE(res);
            ASSERT_EQ(value.length(), 1000);
        }

        std::cout << "delete mem" << std::endl;
        delete mem;
        std::cout << "-------------------------------------------------------end ..."
                  << std::endl;
    }

    TEST_F(DBMemTableTest, HugeDataInsert) {
        Options options;
        // Create a MemTable
        InternalKeyComparator cmp(BytewiseComparator());
        options.memtable_factory = std::make_shared<PureMemFactory>();
        options.pureMemTable = true;
//        options.encode_version_node = false;
        options.cf_paths.emplace_back(dbname_, 0);
        ImmutableCFOptions ioptions(options);
        WriteBufferManager wb(options.db_write_buffer_size);
        MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
                                     kMaxSequenceNumber, 0 /* column_family_id */);
        std::cout << "memtable create ok." << std::endl;
        // Write some keys and make sure it returns false on duplicates
        bool res;
        std::cout << "Enter the loop" << std::endl;
        // Test the duplicate keys under stress
        char tmp[1000];
        memset(tmp, 255, 1000);
        SequenceNumber seq = 100000;
        int record_sum = 150000;
        for (int i = 0; i < record_sum; i++) {
            seq++;
            std::string a = std::to_string(seq) + "_test";
            res = mem->Add(seq, kTypeValue, genMVCCKey1(a.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
        }

        std::cout << "-----------insert all records ok." << std::endl;
        // std::this_thread::sleep_for(std::chrono::milliseconds(5000));

        seq = 100000;
        for (int i = 0; i < record_sum; i++) {
            seq++;
            std::string a = std::to_string(seq) + "_test";
            std::string value;
            Status s;
            ReadOptions read_opts;
            SequenceNumber sq = 0;
            MergeContext merge_context;
            res = mem->Get(LookupKey(genMVCCKey1(a.data()), seq), &value, &s,
                           &merge_context, &sq, &sq, read_opts);
            ASSERT_TRUE(res);
            ASSERT_EQ(value.length(), 1000);
        }

        delete mem;
        std::cout << "-------------------------------------------------------end ..."
                  << std::endl;
    }

// RandomDataInsert
    TEST_F(DBMemTableTest, RandomDataInsert) {
        Options options;
        // Create a MemTable
        InternalKeyComparator cmp(BytewiseComparator());
        options.memtable_factory = std::make_shared<PureMemFactory>();
        options.pureMemTable = true;
//        options.encode_version_node = false;
        options.cf_paths.emplace_back(dbname_, 0);
        ImmutableCFOptions ioptions(options);
        WriteBufferManager wb(options.db_write_buffer_size);
        MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
                                     kMaxSequenceNumber, 0 /* column_family_id */);
        std::cout << "memtable create ok." << std::endl;
        // Write some keys and make sure it returns false on duplicates
        bool res;
        std::cout << "Enter the loop" << std::endl;
        // Test the duplicate keys under stress
        char tmp[1000];
        memset(tmp, 255, 1000);
        srand(time(NULL));
        SequenceNumber seq = 0;
        int record_sum = 5000;
        for (int i = 0; i < record_sum; i++) {
            int key = rand() % 10 + 10;
            seq = seq + key;
            std::string a = std::to_string(seq) + "_test";
            res = mem->Add(seq, kTypeValue, genMVCCKey1(a.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);

            std::string value;
            Status s;
            ReadOptions read_opts;
            SequenceNumber sq = 0;
            MergeContext merge_context;
            res = mem->Get(LookupKey(genMVCCKey1(a.data()), seq), &value, &s,
                           &merge_context, &sq, &sq, read_opts);
            ASSERT_TRUE(res);
            ASSERT_EQ(value.length(), 1000);
        }

        std::cout << "-----------insert all records ok." << std::endl;

        delete mem;
        std::cout << "-------------------------------------------------------end ..."
                  << std::endl;
    }

    void insertAndGet(std::string str, MemTable* mem) {
        bool res;
        std::cout << "Enter the loop" << std::endl;
        // Test the duplicate keys under stress
        char tmp[1000];
        memset(tmp, 255, 1000);
        SequenceNumber seq = 1000000;
        int record_sum = 100000;
        for (int i = 0; i < record_sum; i++) {
            seq++;
            std::string a = std::to_string(seq) + str;
            res = mem->Add(seq, kTypeValue, genMVCCKey1(a.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
        }

        seq = 1000000;
        for (int i = 0; i < record_sum; i++) {
            seq++;
            std::string a = std::to_string(seq) + str;
            std::string value;
            Status s;
            ReadOptions read_opts;
            SequenceNumber sq = 0;
            MergeContext merge_context;
            res = mem->Get(LookupKey(genMVCCKey1(a.data()), seq), &value, &s,
                           &merge_context, &sq, &sq, read_opts);
            ASSERT_TRUE(res);
            ASSERT_EQ(value.length(), 1000);
        }

        std::cout << "-----------insert all this thread records ok." << std::endl;
    }
// Insert data for multiple threads
    TEST_F(DBMemTableTest, MultithreadInsert) {
        Options options;
        // Create a MemTable
        InternalKeyComparator cmp(BytewiseComparator());
        options.memtable_factory = std::make_shared<PureMemFactory>();
        options.pureMemTable = true;
//        options.encode_version_node = false;
        options.cf_paths.emplace_back(dbname_, 0);
        ImmutableCFOptions ioptions(options);
        WriteBufferManager wb(options.db_write_buffer_size);
        MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
                                     kMaxSequenceNumber, 0 /* column_family_id */);

        std::thread test1(insertAndGet, "test", mem);
        std::thread test2(insertAndGet, "_tast", mem);
        std::thread test3(insertAndGet, "_ttst1", mem);
        std::thread test4(insertAndGet, "_ttst2", mem);
        std::thread test5(insertAndGet, "_ttst3", mem);
        std::thread test6(insertAndGet, "_ttst4", mem);
        std::thread test7(insertAndGet, "_ttst5", mem);
        std::thread test8(insertAndGet, "_ttst6", mem);
        //      insertAndGet("test");
        //      insertAndGet("tast");
        //      insertAndGet("twst");
        test1.join();
        test2.join();
        test3.join();
        test4.join();
        test5.join();
        test6.join();
        test7.join();
        test8.join();
        delete mem;
        std::cout << "-------------------------------------------------------end ..."
                  << std::endl;
    }

// The first step is to change the Manager's internal disassembly and expansion
// event to direct call processing logic.
    TEST_F(DBMemTableTest, HugeDataInsertWithoutThread) {
        Options options;
        // Create a MemTable
        InternalKeyComparator cmp(BytewiseComparator());
        options.memtable_factory = std::make_shared<PureMemFactory>();
        options.pureMemTable = true;
//        options.encode_version_node = false;
        options.cf_paths.emplace_back(dbname_, 0);
        ImmutableCFOptions ioptions(options);
        WriteBufferManager wb(options.db_write_buffer_size);
        MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
                                     kMaxSequenceNumber, 0 /* column_family_id */);
        std::cout << "memtable create ok." << std::endl;
        // Write some keys and make sure it returns false on duplicates
        bool res;
        std::cout << "Enter the loop" << std::endl;
        // Test the duplicate keys under stress
        char tmp[1000];
        memset(tmp, 255, 1000);
        SequenceNumber seq = 10000000;
        int record_sum = 300000;
        for (int i = 0; i < record_sum; i++) {
            seq++;
            std::string a = std::to_string(seq) + "_test";
            res = mem->Add(seq, kTypeValue, genMVCCKey1(a.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);

            std::string value;
            Status s;
            ReadOptions read_opts;
            SequenceNumber sq = 0;
            MergeContext merge_context;
            res = mem->Get(LookupKey(genMVCCKey1(a.data()), seq), &value, &s,
                           &merge_context, &sq, &sq, read_opts);
            ASSERT_TRUE(res);
            ASSERT_EQ(value.length(), 1000);
        }

        std::cout << "-----------insert all records ok." << std::endl;

        seq = 10000000;
        for (int i = 0; i < record_sum; i++) {
            seq++;
            std::string a = std::to_string(seq) + "_test";
            std::string value;
            Status s;
            ReadOptions read_opts;
            SequenceNumber sq = 0;
            MergeContext merge_context;
            res = mem->Get(LookupKey(genMVCCKey1(a.data()), seq), &value, &s,
                           &merge_context, &sq, &sq, read_opts);
            ASSERT_TRUE(res);
            ASSERT_EQ(value.length(), 1000);
        }
        std::cout << "-----------get all records ok." << std::endl;
        delete mem;
        std::cout << "-------------------------------------------------------end ..."
                  << std::endl;
    }

// Deletion and handle for cut
    TEST_F(DBMemTableTest, Deletion) {
        Options options;
        // Create a MemTable
        InternalKeyComparator cmp(BytewiseComparator());
        options.memtable_factory = std::make_shared<PureMemFactory>();
        options.pureMemTable = true;
//        options.encode_version_node = false;
        options.cf_paths.emplace_back(dbname_, 0);
        ImmutableCFOptions ioptions(options);
        WriteBufferManager wb(options.db_write_buffer_size);
        MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
                                     kMaxSequenceNumber, 0 /* column_family_id */);
        bool res;
        char tmp[1000] = {'0'};
        SequenceNumber seq = 1000000;
        for (int i = 0; i < 100000; i++) {
            std::string a = std::to_string(seq) + "_test";
            std::string b = std::to_string(seq) + "_tast";
            std::string c = std::to_string(seq) + "_trst";
            std::string d = std::to_string(seq) + "_tgst";
            std::string value;
            Status s;
            ReadOptions read_opts;
            MergeContext merge_context;

            res = mem->Add(seq++, kTypeValue, genMVCCKey1(a.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeDeletion, genMVCCKey1(a.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeValue, genMVCCKey1(a.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeDeletion, genMVCCKey1(a.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeValue, genMVCCKey1(b.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeValue, genMVCCKey1(c.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeValue, genMVCCKey1(a.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeValue, genMVCCKey1(b.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeValue, genMVCCKey1(a.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeValue, genMVCCKey1(b.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
        }
        std::cout << "-----------insert all records ok." << std::endl;
        delete mem;
        std::cout << "-------------------------------------------------------end ..."
                  << std::endl;
    }

// Deletion
    TEST_F(DBMemTableTest, DeletionWrongCase1) {
        Options options;
        // Create a MemTable
        InternalKeyComparator cmp(BytewiseComparator());
        options.memtable_factory = std::make_shared<PureMemFactory>();
        options.pureMemTable = true;
//        options.encode_version_node = false;
        options.cf_paths.emplace_back(dbname_, 0);
        ImmutableCFOptions ioptions(options);
        WriteBufferManager wb(options.db_write_buffer_size);
        MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
                                     kMaxSequenceNumber, 0 /* column_family_id */);
        bool res;
        char tmp[1000] = {'0'};
        SequenceNumber seq = 10000;
        for (int i = 0; i < 8000; i++) {
            std::string a = std::to_string(seq) + "_1";
            std::string b = std::to_string(seq) + "_2";
            std::string c = std::to_string(seq) + "_3";
            std::string d = std::to_string(seq) + "_4";
            std::string value;
            Status s;
            ReadOptions read_opts;
            MergeContext merge_context;

            res = mem->Add(seq++, kTypeValue, genMVCCKey1(a.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeValue, genMVCCKey1(a.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeValue, genMVCCKey1(b.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeValue, genMVCCKey1(b.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeDeletion, genMVCCKey1(b.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeValue, genMVCCKey1(c.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeValue, genMVCCKey1(a.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
        }

        delete mem;
        std::cout << "---------------------------------------------end ..." << std::endl;
    }

    TEST_F(DBMemTableTest, RangeDeletion) {
        Options options;
        // Create a MemTable
        InternalKeyComparator cmp(BytewiseComparator());
        options.memtable_factory = std::make_shared<PureMemFactory>();
        options.pureMemTable = true;
//        options.encode_version_node = false;
        options.cf_paths.emplace_back(dbname_, 0);
        ImmutableCFOptions ioptions(options);
        WriteBufferManager wb(options.db_write_buffer_size);
        MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
                                     kMaxSequenceNumber, 0 /* column_family_id */);
        std::cout << "memtable create ok." << std::endl;
        // Write some keys and make sure it returns false on duplicates
        bool res;
        std::cout << "Enter the loop" << std::endl;
        // Test the duplicate keys under stress
        char tmp[1000] = {'0'};
        SequenceNumber seq = 10000;
        std::string value;
        Status s;
        ReadOptions read_opts;
        SequenceNumber sq = 0;
        MergeContext merge_context;

        for (int i = 0; i < 100000; i++) {
            seq++;
            std::string a = std::to_string(seq) + "_tast";
            std::string b = std::to_string(seq) + "_tbst";
            std::string c = std::to_string(seq) + "_tcst";
            std::string d = std::to_string(seq) + "_tdst";
            std::string e = std::to_string(seq) + "_test";
            std::string f = std::to_string(seq) + "_tfst";
            std::string g = std::to_string(seq) + "_tgst";
            std::string h = std::to_string(seq) + "_thst";
            res = mem->Add(seq++, kTypeValue, genMVCCKey1(a.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeValue, genMVCCKey1(b.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeValue, genMVCCKey1(c.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeValue, genMVCCKey1(d.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeValue, genMVCCKey1(e.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeValue, genMVCCKey1(f.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeValue, genMVCCKey1(g.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeValue, genMVCCKey1(h.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeRangeDeletion, genMVCCKey1(a.data()), Slice(c));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeValue, genMVCCKey1(a.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeValue, genMVCCKey1(b.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeValue, genMVCCKey1(a.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Add(seq++, kTypeValue, genMVCCKey1(c.data()), Slice(tmp, 1000));
            ASSERT_TRUE(res);
            res = mem->Get(LookupKey(genMVCCKey1(a.data()), seq - 13), &value, &s,
                           &merge_context, &sq, &sq, read_opts);
            ASSERT_FALSE(res);

            res = mem->Get(LookupKey(genMVCCKey1(b.data()), seq - 12), &value, &s,
                           &merge_context, &sq, &sq, read_opts);
            ASSERT_FALSE(res);

            res = mem->Get(LookupKey(genMVCCKey1(a.data()), seq - 4), &value, &s,
                           &merge_context, &sq, &sq, read_opts);
            ASSERT_TRUE(res);

            res = mem->Get(LookupKey(genMVCCKey1(d.data()), seq - 10), &value, &s,
                           &merge_context, &sq, &sq, read_opts);
            ASSERT_TRUE(res);
        }

        delete mem;
        std::cout << "---------------------------------------------------end " << std::endl;
    }

    TEST_F(DBMemTableTest, HashIndex) {
        Options options = CurrentOptions();
//        options.encode_version_node = false;
        // Create a MemTable
        InternalKeyComparator cmp(&kComparator);
        options.memtable_factory = std::make_shared<PureMemFactory>();
        options.pureMemTable = true;
        options.cf_paths.emplace_back(dbname_, 0);
        //options.comparator = & kComparator;
        ImmutableCFOptions ioptions(options);
        WriteBufferManager wb(options.db_write_buffer_size);
        MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
                                     kMaxSequenceNumber, 0 /* column_family_id */);
        std::cout << "memtable create ok." << std::endl;
        // Write some keys and make sure it returns false on duplicates
        bool res;
        std::cout << "进入循环" << std::endl;
        // Test the duplicate keys under stress
        char tmp[1000] = {'0'};
        SequenceNumber seq = 10000;
        int record_sum = 1;
        for (int i = 0; i < record_sum; i++) {
            seq++;
            std::string a = "test";
            // std::cout << "a : " << a << ", a.data()" << a.data() << ", a.length:" <<
            // a.length()<< std::endl;
            res = mem->Add(seq, kTypeValue, EncodeKey(Slice(a), 100,0) , Slice(tmp, 1000));
            ASSERT_TRUE(res);
            std::string value;
            Status s;
            ReadOptions read_opts;
            SequenceNumber sq = 0;
            MergeContext merge_context;
            read_opts.is_mvcc = true;
            res = mem->Get(LookupKey(EncodeKey(Slice(a), 2000,0), seq), &value, &s,
                           &merge_context, &sq, &sq, read_opts);
            ASSERT_TRUE(res);
            ASSERT_EQ(value.length(), 1000);
        }
        std::cout << "-----------insert all records ok." << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));



        std::cout << "delete mem" << std::endl;
        delete mem;
        std::cout << "-------------------------------------------------------end ..."
                  << std::endl;
    }


    char *randstr(char *str, const int len)
    {

        int i;
        for (i = 0; i < len; ++i)
        {
            switch ((rand() % 3))
            {
                case 1:
                    str[i] = 'A' + rand() % 26;
                    break;
                case 2:
                    str[i] = 'a' + rand() % 26;
                    break;
                default:
                    str[i] = '0' + rand() % 10;
                    break;
            }
        }
        str[++i] = '\0';
        return str;
    }

    TEST_F(DBMemTableTest, HashIndexPerformance) {
        Options options = CurrentOptions();
//        options.encode_version_node = false;
        // Create a MemTable
        InternalKeyComparator cmp(BytewiseComparator());
        options.memtable_factory = std::make_shared<PureMemFactory>();
        options.pureMemTable = true;
        options.cf_paths.emplace_back(dbname_, 0);
        ImmutableCFOptions iOptions(options);
        WriteBufferManager wb(options.db_write_buffer_size);
        MemTable* mem = new MemTable(cmp, iOptions, MutableCFOptions(options), &wb,
                                     kMaxSequenceNumber, 0 /* column_family_id */);
        std::cout << "memtable create ok." << std::endl;
        // Write some keys and make sure it returns false on duplicates
        bool res;
        std::cout << "进入循环" << std::endl;

        std::vector<std::string> keys;

        int N = 200000;
        srand(time(NULL));
        for(int i=0; i<N; i++){
            char key[20];
            keys.push_back(randstr(key, 14));
        }

        clock_t startTime,addTime, getTime;
        char tmp[1000] = {'0'};
        SequenceNumber seq = 10000;
        startTime = clock();
        for(int i=0; i<N; i++){
            seq++;
            ASSERT_TRUE(mem->Add(seq, kTypeValue, genMVCCKey1(keys[i].data()), Slice(tmp, 104)));
            seq++;
            ASSERT_TRUE(mem->Add(seq, kTypeValue, genMVCCKey1(keys[i].data()), Slice(tmp, 105)));
        }

        addTime = clock();

        std::cout << "addTime: " <<(double)(addTime - startTime)*1000 / CLOCKS_PER_SEC << "ms" << std::endl;

        std::string value;
        Status s;
        SequenceNumber sq = 0;
        seq = 10000;
        ReadOptions read_opts;
        MergeContext merge_context;

        for(int i=0; i<N; i++){
            seq++;
            res = mem->Get(LookupKey(genMVCCKey1(keys[i].data()), seq), &value, &s,
                           &merge_context, &sq, &sq, read_opts);
            ASSERT_TRUE(res);
            ASSERT_EQ(value.length(), 104);
            seq++;

            res = mem->Get(LookupKey(genMVCCKey1(keys[i].data()), seq), &value, &s,
                           &merge_context, &sq, &sq, read_opts);
            ASSERT_TRUE(res);
            ASSERT_EQ(value.length(), 105);
        }

        getTime = clock();
        std::cout << "getTime: " <<(double)(getTime - addTime)*1000 / CLOCKS_PER_SEC << "ms" << std::endl;


        std::cout << "delete mem" << std::endl;
        delete mem;
        std::cout << "-------------------------------------------------------end ..."
                  << std::endl;
    }
} // namespace rocksdb

int main(int argc, char** argv) {
    rocksdb::port::InstallStackTraceHandler();
    ::testing::InitGoogleTest(&argc, argv);
    return RUN_ALL_TESTS();
}
